package com.springboot.learn.retry.message;

import com.springboot.learn.retry.model.MyRetryContext;
import com.springboot.learn.retry.model.RetryTargetInfo;
import com.springboot.learn.retry.repository.db.Retry;
import com.springboot.learn.retry.repository.db.RetryMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Repository;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;

@Slf4j
@Repository
public class RetryMessageDelayQueueImpl extends RetryMessage implements InitializingBean {

    @Autowired
    private RetryMapper retryMapper;
    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    private DelayQueue<MyRetryContext> queue = new DelayQueue<>();


    @PostConstruct
    public void initFromDb() {
        List<Retry> retries = retryMapper.selectTask();
        List<MyRetryContext> contexts = new ArrayList<>();
        for (Retry retry : retries) {
            MyRetryContext myRetryContext = new MyRetryContext();
            RetryTargetInfo retryTargetInfo = RetryTargetInfo.RetryTargetInfoJsonParser.of(retry.getClassName(), retry.getMethodName(), retry.getArgsType(), retry.getArgs());
            myRetryContext.setRetryTargetInfo(retryTargetInfo);
            myRetryContext.setStartTime(retry.getStartTime().getTime());
            myRetryContext.setTime(retry.getRetryTime().getTime());
            myRetryContext.setUuid(retry.getUuid());
            myRetryContext.setStatus(retry.getStatus());
            myRetryContext.setRetryCount(retry.getRetryCount());
            contexts.add(myRetryContext);
        }
        queue.addAll(contexts);
    }

    @Override
    public void consumeAt(Object object, Long time) {
        queue.put((MyRetryContext) object);
        log.info("size {} -> {}", queue.size() - 1, queue.size());
    }

    @Override
    public void afterPropertiesSet() {
        threadPoolTaskExecutor.execute(() -> {
            try {
                while (true) {
                    if (countObservers() == 0) {
                        Thread.sleep(100);
                        continue;
                    }
                    MyRetryContext take = queue.take();
                    log.info("获取重试任务成功，任务数 {} -> {}", queue.size() + 1, queue.size());
                    // 执行重试
                    setChanged();
                    notifyObservers(take);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }
}
